import json
import time
import logging
from kafka import KafkaConsumer
from kafka import KafkaProducer

# define the func to transfer the json
def tran_json(input):
    result= {}
    data = json.loads(input)

    # get "app_package_name", "app_type", "app_uuid" and "auth_passed"
    result["app_package_name"]=data['app_package_name']
    result["app_type"]=data['app_type']
    result["app_uuid"]=data['app_uuid']
    result["auth_passed"]=data['auth_passed']

    # make the "client" node
    result["client"]={}
    result["client"]["dev_manufacturer"]=data["client"]["dev_manufacturer"]
    result["client"]["dev_model"]=data["client"]["dev_model"]

    # make the "common" node
    result["common"]={}
    if data["app_type"] == 100:
        if data["client"]["user_agent"] == "":
            result["common"]["os"]="UNKNOWN"
            result["common"]["platform"]="H5"
        elif data["client"]["user_agent"] == "iOS":
            result["common"]["os"] = "iOS"
            result["common"]["platform"] = "iOS"
        elif data["client"]["user_agent"] == "Android":
            result["common"]["os"] = "Android"
            result["common"]["platform"] = "Android"
    result["common"]["version"]=data["version"]

    # get "country"
    result["country"]=data['country']

    # make the "location" node
    result["location"]={}
    try:
        result["location"]["lat"]=float(data['latitude'])
    except:
        logging.error('Cannot convert the latitude: {}'.format(data['latitude']))

    try:
        result["location"]["lng"]=float(data['longtitude'])
    except:
        logging.error('Cannot convert the longtitude: {}'.format(data['longtitude']))

    # get "os_lang" and "remote_addr"
    result["os_lang"]=data['os_lang']
    result["remote_addr"]=data['remote_addr']

    # convert the log_time to the timestampMs
    try:
        log_time = data['log_time']
        timeArray = time.strptime(log_time, "%Y-%m-%d %H:%M:%S")
        timeStamp = int(time.mktime(timeArray) * 1000)
        result["timestampMs"]=timeStamp
    except:
        logging.error('Cannot convert the log_time: {} to the timestampMs'.format(data['log_time']))

    # get "user_id" and "version"
    result["user_id"]=data['user_id']
    result["version"]=data['version']

    return result


if __name__ == '__main__':

    source_topic = "src"
    target_topic = "des"
    server = ['127.0.0.1:9092']
    consumer = KafkaConsumer(source_topic, bootstrap_servers=server, group_id='test', auto_offset_reset='earliest')
    producer = KafkaProducer(bootstrap_servers=server, value_serializer=lambda m: json.dumps(m).encode())

    for msg in consumer:
        if msg is not None:
            input = msg.value
            result = tran_json(input)
            producer.send(target_topic, result)


######### unit_test follows: ###########

# demo_input = '''{
#   "app_package_name": "com.google.app",
#   "app_type": 100,
#   "app_uuid": "6e1343ff1942828",
#   "auth_passed": true,
#   "client": {
#     "dev_manufacturer": "vivo",
#     "dev_model": "vivo 1612",
#     "user_agent": ""
#   },
#   "country": "ID",
#   "latitude": "120.3399",
#   "log_time": "2020-08-15 00:00:02",
#   "longtitude": "30.1211",
#   "os_lang": "in_ID",
#   "remote_addr": "110.139.149.102",
#   "user_id": "8371792",
#   "version": "3.3.1"
# }'''

# print the output and save it to the file
# demo_result = tran_json(demo_input)
# print(demo_result)
# with open('output.json','w+') as f:
#     json.dump(demo_result,f)
